-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-45788: [C++][Acero] Fix data race in aggregate node #45789
base: main
Are you sure you want to change the base?
Conversation
|
Hi @pitrou , would you like to take a look? Thanks. |
@github-actions crossbow submit emscripten |
Revision: 9d8ddbb Submitted crossbow builds: ursacomputing/crossbow @ actions-73916ce3ac
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also makes Segmenter::GetSegments
a const fn? (Or I can do it in a separate patch )
I guess not. Both arrow/cpp/src/arrow/compute/row/grouper.cc Line 163 in 7e18764
arrow/cpp/src/arrow/compute/row/grouper.cc Line 273 in 7e18764
|
@@ -312,7 +312,7 @@ Result<ExecBatch> GroupByNode::Finalize() { | |||
segment_key_field_ids_.size()); | |||
|
|||
// Segment keys come first | |||
PlaceFields(out_data, 0, segmenter_values_); | |||
PlaceFields(out_data, 0, state->segmenter_values); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the Finalize
step only considers the segmenter values for state[0]
? I'm not sure I understand why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See [1], the Finalize
is following a Merge
call, by when the states of all threads have been merged into state[0]
.
But this raises an interesting question that I only realized now: the proposed thread-local segmenter_values
s are never merged, how dare I only append segmenter_values
in state[0]
? This seems wrong but turns out to be OK because of some implicit assumptions quite far away:
- If the
segmenter_values_
is not empty, then at least one segment key is specified, then we are executing single-threaded and only "state[0]" exists. This is detailed in my answer to the other comment. - Else, then there are no segment keys and we are appending nothing.
Even though the fix actually works, it "feels" more weird than before - a shared segmenter_values_
seems to be, at least conceptually, more reasonable (though it has race and isn't really correct in the context of multi-threading). This makes me hesitate about the current fix. I think it all boils down to the "abstraction leak" in the original design:
- The segmenter abstraction seems to be designed to be independent of the assumption that "no multi-threading for segmented cases" (and honestly I am personally quite fond of this design).
- The existence of
segmenter_values_
strongly depends on the assumption of single-threaded.
Now I will try to fix the race in another way more independent of this abstraction leak, leaving the later to be addressed in the future.
[1]
arrow/cpp/src/arrow/acero/groupby_aggregate_node.cc
Lines 353 to 354 in fc0862a
RETURN_NOT_OK(Merge()); | |
ARROW_ASSIGN_OR_RAISE(out_data_, Finalize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds reasonable. If we wanted to have multi-threaded segmented group-by, I suppose it would need a preparatory step to rechunk the input along segment boundaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be the case, but may not be that useful. Because IMO part of the point of segmented aggregation is to emit the result in a streaming fashion, that is, as soon as a segment is concluded to be "close", we are sure that the current aggregation result is a valid partial result thus can be output to downstream. Rechunking the input would require all input batches to be accumulated already.
To multi-threading the segmented aggregation, I would imagine the batches to be already partitioned by (and sorted by, of course, this is already implied by the current single-threaded impl) segment keys and distributed to specific threads (# partition == # thread). This can be achieved by a special source node or a "shuffle" node.
This way each thread of the aggregate can process all rows belonging to a specific segment. This would require some modification to the current aggregate node such as not merging other thread states, or just have a brand new "partitioned aggregate" node.
Anyway, it's not trivial and can be quite restrictive to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I've redone the fix in a way that is less weird.
c984de6
to
f2af2d7
Compare
Rationale for this change
Data race described in #45788 .
What changes are included in this PR?
Put the racing member
segmenter_values
in thread local state.Are these changes tested?
Yes. UT added.
Are there any user-facing changes?
None.